﻿using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Threading;

using vJine.Core.IO;
using vJine.Core.Log;
using vJine.Core.Task;
using vJine.Core.IoC;
using vJine.Core.IO.Bin;

namespace vJine.Core.MQ {
    public class MessageClient<T> {
        #region Client

        internal BinHelper<MessagePackage<T>> Tx = null;

        TaskQueue<MessagePackage<T>> PMQ_Send = null, PMQ_Receive = null;
        public MessageClient()
            : this("127.0.0.1", 1239) {
        }

        public MessageClient(string host, int port) {
            this.host = host; this.port = port;

            this.PMQ_Send = new TaskQueue<MessagePackage<T>>(1, 100, null, (MessagePackage<T> M) => {
                this.Write(M);
            }, "Write");

            this.PMQ_Receive = new TaskQueue<MessagePackage<T>>(10, () => {
                try {
                    while (this.Read()) ;
                } catch (OutOfMemoryException ex) {
                    System.Console.WriteLine(LogManager.dump("R:OOM", ex));
                } catch (IOException ex) {
                    System.Console.WriteLine("R:IO:" + ex.Message + "," + ex.Source);
                } catch (Exception ex) {
                    System.Console.WriteLine("R:EX:" + ex.Message + "," + ex.Source);
                }
            });
        }

        internal MessageClient(NetworkStream stream, TaskQueue<MessagePackage<T>> task_send, TaskQueue<MessagePackage<T>> task_receive) {
            this.Tx = new BinHelper<MessagePackage<T>>(stream);
            this.PMQ_Send = task_send; this.PMQ_Receive = task_receive;
        }

        internal static void service_daemon(Exec<MessageClient<T>> client_enqueue, Exec<string, MessagePackage<T>> msg_enqueue, MessageClient<T> mClient) {
            Call<MessagePackage<T>> msg_read = mClient.Tx.Read;
            NewMessageEventHandler<T> new_message = mClient.NewMessage;

            try {
                MessagePackage<T> Mnew = msg_read();

                if (Mnew.Cmd == MessageCommand.Close) {
                    Mnew = null; return;
                }

                if (new_message == null) {
                    msg_enqueue(Mnew.Topic, Mnew);
                } else {
                    NewMessageEventArgs<T> Arg =
                        new NewMessageEventArgs<T>() { Owner = mClient, Message = Mnew, Handled = false };

                    try {
                        new_message(Arg);
                    } catch (Exception ex) {
                        string x = ex.Message;
                        //TODO:忽略消息处理过程中的错误，如果出现错误，消息链则被中断；
                    }
                    if (!Arg.Handled) {
                        msg_enqueue(Mnew.Topic, Mnew);
                    }
                }

                client_enqueue(mClient);
            } catch(Exception ex) {
                string msg = ex.Message;
            }
        }
        #endregion Client

        #region Actions

        #region Open & Close
        int ConnCounter = 0; string host = "127.0.0.1"; int port = 1239;
        public void Open(string host, int port) {
            lock (this) {
                this.ConnCounter++;
                if (this.ConnCounter > 1) {
                    return;
                }

                try {
                    TcpClient Client = new TcpClient();
                    Client.Connect(host, port);
                    this._Open(Client.GetStream());

                    this.host = host; this.port = port;
                } catch (Exception ex) {
                    this.ConnCounter--; throw ex;
                }
            }
        }

        public void Open() {
            this.Open(this.host, this.port);
        }

        Stream ClientStream = null;
        public void Open(Stream stream) {
            lock (this)
            {
                this.ConnCounter++;
                if (this.ConnCounter > 1) {
                    return;
                }

                try {
                    this._Open(stream);
                } catch (Exception ex) {
                    this.ConnCounter--; throw ex;
                }
            }
        }

        long Write(MessagePackage<T> M){
            try {
                long len =
                    this.Tx.Write(M); this.Tx.Flush();

                if (M.MSent != null) {
                    M.MSent.Set();
                }

                return len;
            } catch (Exception ex) {
                M.exSent = ex; return -1;
            }
        }

        bool Read() {
            MessagePackage<T> Mnew = this.Tx.Read();

            if (Mnew.Cmd == MessageCommand.Close) {
                Mnew = null; return false;
            }

            if (this.NewMessage == null) {
                this.PMQ_Receive.Enqueue(Mnew.Topic, Mnew);
            } else {
                NewMessageEventArgs<T> Arg =
                    new NewMessageEventArgs<T>() { Owner = this, Message = Mnew, Handled = false };

                this.NewMessage(Arg);
                if (!Arg.Handled) {
                    this.PMQ_Receive.Enqueue(Mnew.Topic, Mnew);
                }
            }

            return true;
        }

        void _Open(Stream stream) {
            this.Tx = new BinHelper<MessagePackage<T>>(stream);

            this.PMQ_Send.Start(); this.PMQ_Receive.Start();

            this.ClientStream = stream;
        }

        public bool Close() {
            lock (this)
            {
                this.ConnCounter--;

                if (this.ConnCounter < 0) {
                    this.ConnCounter = 0; return true;
                }
                else if (this.ConnCounter > 0) {
                    return false;
                }

                this.Send("", default(T), MessagePriority.P5, MessageCommand.Close, false);

                this.ClientStream.Close();
                this.ClientStream = null;
            }

            return true;
        }
        #endregion Open & Close

        #region Send & Receive

        public void Send(T Data) {
            this.Send(Data, false);
        }

        public void Send(T Data, bool IsAsync) {
            this.Send("", Data, MessagePriority.P3, MessageCommand.Trans, IsAsync);
        }

        public void Send(string Name, T Data) {
            this.Send(Name, Data, false);
        }

        public void Send(string Name, T Data, bool IsAsync) {
            this.Send(Name, Data, MessagePriority.P3, MessageCommand.Trans, IsAsync);
        }

        public void Send(string Name, T Data, MessagePriority Priority) {
            this.Send(Name, Data, Priority, MessageCommand.Trans, false);
        }

        public void Send(string Name, T Data, MessagePriority Priority, bool IsAsync) {
            this.Send(Name, Data, Priority, MessageCommand.Trans, IsAsync);
        }

        internal void Send(string Name, T Data, MessagePriority Priority, MessageCommand Cmd, bool IsAsync) {
            MessagePackage<T> M = new MessagePackage<T>() { Topic = Name, Data = Data, Cmd = Cmd };
            this.Send(Priority, M, IsAsync);
        }

        internal void Send(MessagePackage<T> M) {
            this.Send(M, false);
        }

        internal void Send(MessagePackage<T> M, bool IsAsync) {
            this.Send(MessagePriority.P3, M, IsAsync);
        }

        internal void Send(MessagePriority Priority, MessagePackage<T> M, bool IsAsync) {
            M.Write = this.Write;

            if (IsAsync) {
                this.PMQ_Send.Enqueue(M, (int)Priority);
            } else {
                M.MSent = new ManualResetEvent(false);
                this.PMQ_Send.Enqueue(M, (int)Priority);
                M.MSent.WaitOne(); M.MSent.Close(); M.MSent = null;
                Exception exSent = M.exSent; M = null;
                if (exSent != null) {
                    throw exSent;
                }
            }
        }

        public object Receive() {
            return this.Receive("");
        }

        public object Receive(string Name) {
            MessagePackage<T> M = null;
            do {
                M = this.PMQ_Receive.Dequeue(Name);
                if (M == null) {
                    continue;
                }

                object objReturn = M.Data; M = null;
                return objReturn;
            } while (true);
        }
        #endregion Send & Receive

        #region Topics

        public event NewMessageEventHandler<T> NewMessage;

        Dictionary<string, NewMessageEventHandler<T>> Topics = new Dictionary<string, NewMessageEventHandler<T>>();

        public NewMessageEventHandler<T> this[string Topic] {
            get {
                lock (this.Topics) {
                    if (this.Topics.Count == 0) {
                        this.NewMessage += new NewMessageEventHandler<T>(Topics_NewMessage);
                    }

                    if (!this.Topics.ContainsKey(Topic)) {
                        this.Topics.Add(Topic, null);
                    }

                    return this.Topics[Topic];
                }
            }
            set {
                lock (this.Topics) {
                    if (this.Topics.Count == 0) {
                        this.NewMessage += new NewMessageEventHandler<T>(Topics_NewMessage);
                    }

                    if (!this.Topics.ContainsKey(Topic)) {
                        this.Topics.Add(Topic, new NewMessageEventHandler<T>(value));
                    } else {
                        this.Topics[Topic] += new NewMessageEventHandler<T>(value);
                    }
                }
            }
        }

        void Topics_NewMessage(NewMessageEventArgs<T> Arg) {
            if (!this.Topics.ContainsKey(Arg.Message.Topic)) {
                return;
            }

            this.Topics[Arg.Message.Topic].Invoke(Arg);
        }
        #endregion Topics
        #endregion Actions
    }
}